package kafka

import (
	"errors"
	"fmt"
	"github.com/Shopify/sarama"
	"log"
	"time"
)

// SyncProducer 同步消息模式
func SyncProducer(broker []string, topic, message string) error {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(broker, config)
	if err != nil {
		return errors.New(fmt.Sprintf("sarama.NewSyncProducer err, message=%s \n", err))
	}
	defer p.Close()
	msg := &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(message),
	}
	part, offset, err := p.SendMessage(msg)
	if err != nil {
		return errors.New(fmt.Sprintf("send sdsds err=%s \n", err))
	} else {
		fmt.Printf("发送成功，partition=%d, offset=%d \n", part, offset)
		return nil
	}
}

// Producer async 异步生产者
type Producer struct {
	brokers       []string
	topic         string
	asyncProducer *sarama.AsyncProducer
	syncProducer  *sarama.SyncProducer
	sync          bool
}

func NewKafkaProducer(brokers []string, topic string, sync bool) *Producer {
	k := &Producer{
		brokers: brokers,
		topic:   topic,
		sync:    sync,
	}
	if sync {
		k.initSync()
	} else {
		k.initAsync()
	}
	return k
}

func (k *Producer) initAsync() bool {
	if k.sync {
		fmt.Printf("sync producer cant call async func ！\n")
		return false
	}
	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	//随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
	//注意，版本设置不对的话，kafka会返回很奇怪的错误，并且无法成功发送消息
	config.Version = sarama.V0_10_0_1

	producer, e := sarama.NewAsyncProducer(k.brokers, config)
	if e != nil {
		fmt.Println(e)
		return false
	}
	k.asyncProducer = &producer
	defer producer.AsyncClose()
	pd := *k.asyncProducer
	go func() {
		for {
			select {
			case <-pd.Successes():
				//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
			case fail := <-pd.Errors():
				fmt.Printf("err: %s  \n", fail.Err.Error())
			}
		}
	}()

	return true
}

func (k *Producer) initSync() bool {
	if !k.sync {
		fmt.Println("async producer cant call sync func ！")
		return false
	}

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	p, err := sarama.NewSyncProducer(k.brokers, config)
	k.syncProducer = &p
	if err != nil {
		log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
		return false
	}
	return true
}

func (k *Producer) SendMsgAsync(sendStr string) {

	msg := &sarama.ProducerMessage{
		Topic: k.topic,
	}

	//将字符串转化为字节数组
	msg.Value = sarama.ByteEncoder(sendStr)
	//fmt.Println(value)

	//使用通道发送
	pd := *k.asyncProducer
	pd.Input() <- msg
}

func (k *Producer) SendMsgSync(sendStr string) bool {
	msg := &sarama.ProducerMessage{
		Topic: k.topic,
		Value: sarama.ByteEncoder(sendStr),
	}
	pd := *k.syncProducer
	part, offset, err := pd.SendMessage(msg)
	if err != nil {
		fmt.Printf("发送失败 send  message(%s) err=%s \n", sendStr, err)
		return false
	} else {
		fmt.Printf("发送成功 partition=%d, offset=%d \n", part, offset)
		return true
	}
}
